1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: SessionConsumer.java,v 1.4 2007/01/24 12:00:28 tanderson Exp $
44 */
45 package org.exolab.jms.server;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.exolab.jms.client.JmsMessageListener;
50 import org.exolab.jms.message.MessageImpl;
51 import org.exolab.jms.messagemgr.Condition;
52 import org.exolab.jms.messagemgr.ConsumerEndpoint;
53 import org.exolab.jms.messagemgr.ConsumerEndpointListener;
54 import org.exolab.jms.messagemgr.Flag;
55 import org.exolab.jms.messagemgr.MessageHandle;
56 import org.exolab.jms.messagemgr.QueueBrowserEndpoint;
57 import org.exolab.jms.messagemgr.TimedCondition;
58 import org.exolab.jms.persistence.DatabaseService;
59 import org.exolab.jms.persistence.PersistenceException;
60 import org.exolab.jms.scheduler.Scheduler;
61 import org.exolab.jms.scheduler.SerialTask;
62
63 import javax.jms.JMSException;
64 import java.rmi.RemoteException;
65 import java.util.ArrayList;
66 import java.util.HashMap;
67 import java.util.Iterator;
68 import java.util.LinkedList;
69 import java.util.List;
70
71
72 /***
73 * Manages all consumers for a session.
74 *
75 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
76 * @version $Revision: 1.4 $ $Date: 2007/01/24 12:00:28 $
77 */
78 class SessionConsumer implements ConsumerEndpointListener {
79
80 /***
81 * The message listener is the reference to a remote client that will
82 * receive the messages.
83 */
84 private JmsMessageListener _listener;
85
86 /***
87 * Maintain a set of ConsumerEndpoint instances, keyed on id.
88 */
89 private final HashMap _consumers = new HashMap();
90
91 /***
92 * Caches all sent messages.
93 */
94 private final SentMessageCache _sent;
95
96 /***
97 * The database service.
98 */
99 private final DatabaseService _database;
100
101 /***
102 * The set of consumer endpoints with messages pending.
103 */
104 private final LinkedList _pending = new LinkedList();
105
106 /***
107 * Determines if the sender is stopping/stopped.
108 */
109 private Flag _stop = new Flag(true);
110
111 /***
112 * Stop/start lock.
113 */
114 private final Object _restartLock = new Object();
115
116 /***
117 * The active consumer lock.
118 */
119 private final Object _removeLock = new Object();
120
121 /***
122 * The consumer currently being dispatched to.
123 */
124 private long _consumerId = -1;
125
126 /***
127 * The maximum number of messages that a dispatch can deliver at any one
128 * time
129 */
130 private final int MAX_MESSAGES = 200;
131
132 /***
133 * The logger.
134 */
135 private static final Log _log = LogFactory.getLog(SessionConsumer.class);
136
137
138 private final SerialTask _runner;
139
140 /***
141 * Construct a new <code>SessionConsumer</code>.
142 *
143 * @param ackMode the message acknowledgement mode, or
144 * <code>Session.TRANSACTED_SESSION</code>
145 * if the session is transactional
146 * @param database the database service
147 * @param scheduler the scheduler
148 */
149 public SessionConsumer(int ackMode, DatabaseService database,
150 Scheduler scheduler) {
151 _database = database;
152 _sent = new SentMessageCache(ackMode);
153 Runnable task = new Runnable() {
154 public void run() {
155 dispatch();
156 }
157 };
158
159 _runner = new SerialTask(task, scheduler);
160 }
161
162 /***
163 * Set the listener for this session.
164 * <p/>
165 * The listener is notified whenever a message for the session is present.
166 *
167 * @param listener the message listener
168 */
169 public synchronized void setMessageListener(JmsMessageListener listener) {
170 _listener = listener;
171 }
172
173 /***
174 * Register a consumer.
175 *
176 * @param consumer the consumer to add
177 */
178 public synchronized void addConsumer(ConsumerEndpoint consumer) {
179 final long id = consumer.getId();
180 _consumers.put(new Long(id), consumer);
181 consumer.setListener(this);
182 }
183
184 /***
185 * Deregister a consumer.
186 *
187 * @param consumerId the consumer identifier
188 * @return the consumer
189 * @throws JMSException if the consumer can't be removed
190 */
191 public ConsumerEndpoint removeConsumer(long consumerId)
192 throws JMSException {
193 ConsumerEndpoint consumer;
194 synchronized (_removeLock) {
195 while (consumerId == _consumerId) {
196 try {
197 _removeLock.wait();
198 } catch (InterruptedException ignore) {
199
200 }
201 }
202 synchronized (this) {
203 consumer = (ConsumerEndpoint) _consumers.remove(
204 new Long(consumerId));
205 if (consumer == null) {
206 throw new JMSException("No consumer with id=" + consumerId);
207 }
208 consumer.setListener(null);
209 }
210 synchronized (_pending) {
211 _pending.remove(consumer);
212 }
213 }
214
215 return consumer;
216 }
217
218 /***
219 * Returns the consumers.
220 *
221 * @return the consumers
222 */
223 public synchronized ConsumerEndpoint[] getConsumers() {
224 return (ConsumerEndpoint[]) _consumers.values()
225 .toArray(new ConsumerEndpoint[0]);
226 }
227
228 /***
229 * Enable or disable asynchronous message delivery for a consumer.
230 *
231 * @param consumerId the consumer identifier
232 * @param enable <code>true</code> to enable; <code>false</code> to
233 * disable
234 * @throws JMSException for any JMS error
235 */
236 public void setAsynchronous(long consumerId, boolean enable)
237 throws JMSException {
238 ConsumerEndpoint consumer = getConsumer(consumerId);
239 consumer.setAsynchronous(enable);
240 if (enable && consumer.getMessageCount() != 0) {
241 messageAvailable(consumer);
242 }
243
244 }
245
246 /***
247 * Stop message delivery.
248 */
249 public void stop() {
250 synchronized (_restartLock) {
251 _stop.set(true);
252 _runner.stop();
253 _log.debug("stopped delivery");
254 }
255 }
256
257 /***
258 * Start message delivery.
259 */
260 public void start() throws JMSException {
261 synchronized (_restartLock) {
262 _log.debug("start");
263 _stop.set(false);
264 for (Iterator i = _consumers.values().iterator(); i.hasNext();) {
265 ConsumerEndpoint consumer = (ConsumerEndpoint) i.next();
266 if (needsScheduling(consumer)) {
267 queue(consumer);
268 }
269 }
270 try {
271 _runner.schedule();
272 } catch (InterruptedException exception) {
273 _log.error("Failed to start worker", exception);
274 throw new JMSException("Failed to start worker: " + exception);
275 }
276 }
277 }
278
279 /***
280 * Recover the session.
281 * <p/>
282 * This will cause all unacknowledged messages to be redelivered.
283 *
284 * @throws JMSException if the session can't be recovered
285 */
286 public synchronized void recover() throws JMSException {
287 stop();
288 try {
289 _database.begin();
290 _sent.clear();
291 _database.commit();
292 } catch (Exception exception) {
293 rethrow(exception.getMessage(), exception);
294 }
295 start();
296 }
297
298 /***
299 * Commit the sesion.
300 * <p/>
301 * This will acknowledge all sent messages for all consumers.
302 *
303 * @throws JMSException if the session fails to commit
304 */
305 public synchronized void commit() throws JMSException {
306 try {
307 _database.begin();
308 _sent.acknowledgeAll();
309 _database.commit();
310 } catch (OutOfMemoryError exception) {
311 rethrow("Failed to commit session due to out-of-memory error",
312 exception);
313 } catch (Exception exception) {
314 rethrow(exception.getMessage(), exception);
315 }
316 }
317
318 /***
319 * Rollback the session.
320 * <p/>
321 * This will cause all unacknowledged messages to be redelivered.
322 *
323 * @throws JMSException for any error
324 */
325 public synchronized void rollback() throws JMSException {
326 stop();
327 try {
328 _database.begin();
329 _sent.clear();
330 _database.commit();
331 } catch (Exception exception) {
332 rethrow(exception.getMessage(), exception);
333 }
334 start();
335 }
336
337 /***
338 * Return the next available mesage to the specified consumer.
339 * <p/>
340 * This method is non-blocking. If no messages are available, it will return
341 * immediately.
342 *
343 * @param consumerId the consumer identifier
344 * @return the next message or <code>null</code> if none is available
345 * @throws JMSException for any JMS error
346 */
347 public MessageImpl receiveNoWait(long consumerId) throws JMSException {
348 MessageImpl result = null;
349 if (!_stop.get()) {
350 result = doReceive(consumerId, null);
351 }
352 return result;
353 }
354
355 /***
356 * Return the next available message to the specified consumer.
357 * <p/>
358 * This method is non-blocking. However, clients can specify a
359 * <code>wait</code> interval to indicate how long they are prepared to wait
360 * for a message. If no message is available, and the client indicates that
361 * it will wait, it will be notified via the registered {@link
362 * JmsMessageListener} if one subsequently becomes available.
363 *
364 * @param consumerId the consumer identifier
365 * @param wait number of milliseconds to wait. A value of <code>0
366 * </code> indicates to wait indefinitely
367 * @return the next message or <code>null</code> if none is available
368 * @throws JMSException for any JMS error
369 */
370 public MessageImpl receive(long consumerId, long wait) throws JMSException {
371 MessageImpl result = null;
372 Condition condition;
373 if (wait > 0) {
374 condition = TimedCondition.before(wait);
375 } else {
376 condition = new Flag(true);
377 }
378 if (!_stop.get()) {
379 result = doReceive(consumerId, condition);
380 } else {
381 ConsumerEndpoint consumer = getConsumer(consumerId);
382 consumer.setWaitingForMessage(condition);
383 }
384 return result;
385 }
386
387 /***
388 * Browse up to count messages.
389 *
390 * @param consumerId the consumer identifier
391 * @param count the maximum number of messages to receive
392 * @return a list of {@link MessageImpl} instances
393 * @throws JMSException for any JMS error
394 */
395 public List browse(long consumerId, int count) throws JMSException {
396 ConsumerEndpoint consumer = getConsumer(consumerId);
397 if (!(consumer instanceof QueueBrowserEndpoint)) {
398 throw new JMSException("Can't browse messages: invalid consumer");
399 }
400
401 List messages = new ArrayList(count);
402
403 try {
404 _database.begin();
405 for (int i = 0; i < count && !_stop.get();) {
406 MessageHandle handle = consumer.receive(_stop);
407 if (handle == null) {
408 break;
409 }
410 MessageImpl orig = handle.getMessage();
411 if (orig != null) {
412 messages.add(copy(orig, handle));
413 ++i;
414 }
415 }
416 _database.commit();
417 } catch (Exception exception) {
418 rethrow("Failed to browse messages", exception);
419 }
420 return messages;
421 }
422
423 /***
424 * Acknowledge that a message has been processed.
425 *
426 * @param consumerId the identity of the consumer performing the ack
427 * @param messageId the message identifier
428 * @throws JMSException for any error
429 */
430 public synchronized void acknowledge(long consumerId, String messageId)
431 throws JMSException {
432 try {
433 _database.begin();
434 _sent.acknowledge(messageId, consumerId);
435 _database.commit();
436 } catch (Exception exception) {
437 rethrow("Failed to acknowledge message", exception);
438 }
439 }
440
441 /***
442 * Close the consumer.
443 *
444 * @throws JMSException for any eror
445 */
446 public synchronized void close() throws JMSException {
447 _log.debug("close");
448 stop();
449 _listener = null;
450 try {
451 _database.begin();
452 _sent.clear();
453 _database.commit();
454 } catch (Exception exception) {
455 rethrow(exception.getMessage(), exception);
456 }
457 }
458
459 /***
460 * Notifies that a message is available for a particular consumer.
461 *
462 * @param consumer the consumer
463 */
464 public void messageAvailable(ConsumerEndpoint consumer) {
465 if (queue(consumer)) {
466 try {
467 _runner.schedule();
468 } catch (InterruptedException exception) {
469 _log.error("Failed to schedule worker", exception);
470 }
471 }
472 }
473
474 /***
475 * Send messages to the client.
476 */
477 private void dispatch() {
478 final Condition timeout = TimedCondition.after(30 * 1000);
479 Condition done = new Condition() {
480 public boolean get() {
481 return _stop.get() || timeout.get();
482 }
483 };
484
485 _log.debug("dispatch");
486 int sent = 0;
487 while (sent < MAX_MESSAGES && !done.get()) {
488 ConsumerEndpoint consumer;
489 synchronized (_pending) {
490 if (!_pending.isEmpty()) {
491 consumer = (ConsumerEndpoint) _pending.removeFirst();
492 } else {
493 break;
494 }
495 }
496 if (wantsMessages(consumer)) {
497 if (consumer.isAsynchronous()) {
498 if (send(consumer, done)) {
499 ++sent;
500 }
501 if (needsScheduling(consumer)) {
502 queue(consumer);
503 }
504 } else {
505 notifyMessageAvailable();
506 }
507 }
508 }
509 boolean empty;
510 synchronized (_pending) {
511 empty = _pending.isEmpty();
512 }
513 if (!empty && !_stop.get()) {
514
515 try {
516 _runner.schedule();
517 } catch (InterruptedException exception) {
518 _log.error("Failed to reschedule worker", exception);
519 }
520 }
521 _log.debug("dispatch[sent=" + sent + "]");
522 }
523
524 private void notifyMessageAvailable() {
525 try {
526
527 _listener.onMessageAvailable();
528 } catch (RemoteException exception) {
529 _log.debug("Failed to notify client", exception);
530 }
531 }
532
533 private boolean queue(ConsumerEndpoint consumer) {
534 boolean queued = false;
535 if (!_stop.get()) {
536 synchronized (_pending) {
537 if (!_pending.contains(consumer)) {
538 _pending.add(consumer);
539 queued = true;
540 }
541 }
542 }
543 return queued;
544 }
545
546 private boolean send(ConsumerEndpoint consumer, Condition cancel) {
547 boolean sent = false;
548 MessageHandle handle = null;
549 try {
550 _database.begin();
551 try {
552 synchronized (_removeLock) {
553 _consumerId = consumer.getId();
554 }
555 handle = consumer.receive(cancel);
556 if (handle != null) {
557 MessageImpl message = handle.getMessage();
558 if (message != null) {
559
560 message = copy(message, handle);
561
562
563
564
565
566 consumer.setWaitingForMessage(null);
567
568 _sent.preSend(handle);
569 _database.commit();
570
571
572 sent = send(message);
573
574 if (sent) {
575 _database.begin();
576 _sent.postSend(handle);
577 _database.commit();
578 }
579 }
580 } else {
581 _database.commit();
582 }
583 } finally {
584 synchronized (_removeLock) {
585 _consumerId = -1;
586 _removeLock.notify();
587 }
588 }
589 } catch (Exception exception) {
590 cleanup(exception.getMessage(), exception);
591 }
592 if (!sent && handle != null) {
593 try {
594 _database.begin();
595 handle.release();
596 _database.commit();
597 } catch (Exception exception) {
598 cleanup("Failed to release unsent message", exception);
599 }
600 }
601 return sent;
602 }
603
604 /***
605 * Send the specified message to the client.
606 *
607 * @param message the message
608 * @return <code>true</code> if the message was successfully sent
609 */
610 protected boolean send(MessageImpl message) {
611 boolean delivered = false;
612 try {
613
614 delivered = _listener.onMessage(message);
615 if (_log.isDebugEnabled()) {
616 _log.debug("send[JMSMessageID=" + message.getMessageId()
617 + ", delivered=" + delivered + "]");
618 }
619 } catch (RemoteException exception) {
620 _log.info("Failed to notify client", exception);
621 }
622 return delivered;
623 }
624
625 private boolean wantsMessages(ConsumerEndpoint consumer) {
626 boolean result = false;
627 if (consumer.isAsynchronous() || consumer.isWaitingForMessage()) {
628 result = true;
629 }
630 return result;
631 }
632
633 private boolean needsScheduling(ConsumerEndpoint consumer) {
634 boolean result = false;
635 if (wantsMessages(consumer) && consumer.getMessageCount() != 0) {
636 result = true;
637 }
638 return result;
639 }
640
641 private MessageImpl doReceive(long consumerId, final Condition wait)
642 throws JMSException {
643 ConsumerEndpoint consumer = getConsumer(consumerId);
644
645 Condition cancel;
646 if (wait != null) {
647 cancel = new Condition() {
648 public boolean get() {
649 return _stop.get() || !wait.get();
650 }
651 };
652 } else {
653 cancel = _stop;
654 }
655
656 MessageImpl message = null;
657 try {
658 _database.begin();
659 MessageHandle handle = consumer.receive(cancel);
660
661 if (handle != null) {
662
663 message = handle.getMessage();
664 if (message != null) {
665 message = copy(message, handle);
666 }
667 }
668 if (message == null) {
669
670
671 consumer.setWaitingForMessage(wait);
672 } else {
673
674 consumer.setWaitingForMessage(null);
675
676
677
678
679 _sent.preSend(handle);
680 }
681 _database.commit();
682 } catch (Exception exception) {
683 rethrow(exception.getMessage(), exception);
684 }
685 if (_log.isDebugEnabled()) {
686 if (message != null) {
687 _log.debug("doReceive(consumerId=" + consumerId +
688 ") -> JMSMesssageID=" + message.getMessageId());
689 }
690 }
691
692 return message;
693 }
694
695 /***
696 * Helper to copy a message.
697 *
698 * @param message the message to copy
699 * @param handle the handle the message came from
700 * @return a copy of the message
701 * @throws JMSException if the copy fails
702 */
703 private MessageImpl copy(MessageImpl message, MessageHandle handle)
704 throws JMSException {
705 MessageImpl result;
706 try {
707 result = (MessageImpl) message.clone();
708 result.setJMSRedelivered(handle.getDelivered());
709 result.setConsumerId(handle.getConsumerId());
710 } catch (JMSException exception) {
711 throw exception;
712 } catch (CloneNotSupportedException exception) {
713 _log.error(exception, exception);
714 throw new JMSException(exception.getMessage());
715 }
716 return result;
717 }
718
719 /***
720 * Returns the consumer endpoint given its identifier.
721 *
722 * @param consumerId the consumer identifier
723 * @return the consumer endpoint corresponding to <code>consumerId</code>
724 * @throws JMSException if the consumer doesn't exist
725 */
726 private ConsumerEndpoint getConsumer(long consumerId)
727 throws JMSException {
728 ConsumerEndpoint consumer
729 = (ConsumerEndpoint) _consumers.get(new Long(consumerId));
730 if (consumer == null) {
731 throw new JMSException("Consumer not registered: " + consumerId);
732 }
733 return consumer;
734 }
735
736 /***
737 * Helper to clean up after a failed call.
738 *
739 * @param message the message to log
740 * @param exception the exception to log
741 */
742 private void cleanup(String message, Throwable exception) {
743 _log.error(message, exception);
744 try {
745 if (_database.isTransacted()) {
746 _database.rollback();
747 }
748 } catch (PersistenceException error) {
749 _log.warn("Failed to rollback after error", error);
750 }
751 }
752
753 /***
754 * Helper to clean up after a failed call, and rethrow.
755 *
756 * @param message the message to log
757 * @param exception the exception
758 * @throws JMSException the original exception adapted to a
759 * <code>JMSException</code> if necessary
760 */
761 private void rethrow(String message, Throwable exception)
762 throws JMSException {
763 cleanup(message, exception);
764 if (exception instanceof JMSException) {
765 throw (JMSException) exception;
766 }
767 throw new JMSException(exception.getMessage());
768 }
769
770 }